package vip.aster.websocket.utils;

import cn.hutool.core.collection.CollUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import vip.aster.common.utils.CacheUtils;
import vip.aster.websocket.config.WebSocketUsers;
import vip.aster.websocket.constant.WebSocketConstants;
import vip.aster.websocket.vo.WebSocketMessageVO;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
 * WebSocket 工具类
 *
 * @author Aster
 * @since 2024/3/11 16:55
 */
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
    public static RedisTemplate<Object, Object> redisTemplate = CacheUtils.getRedisTemplate();

    /**
     * 发送消息
     *
     * @param sessionKey session主键 一般为用户id
     * @param message    消息文本
     */
    public static void sendMessage(String sessionKey, String message) {
        WebSocketSession session = WebSocketUsers.getSessions(sessionKey);
        sendMessage(session, message);
    }

    /**
     * 发布订阅的消息
     *
     * @param webSocketMessage 消息对象
     */
    public static void publishMessage(WebSocketMessageVO webSocketMessage) {
        List<String> unsentSessionKeys = new ArrayList<>();
        // 当前服务内session,直接发送消息
        for (String sessionKey : webSocketMessage.getSessionKeys()) {
            if (WebSocketUsers.existSession(sessionKey)) {
                WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
                continue;
            }
            unsentSessionKeys.add(sessionKey);
        }
        // 不在当前服务内session,发布订阅消息
        if (CollUtil.isNotEmpty(unsentSessionKeys)) {
            WebSocketMessageVO broadcastMessage = new WebSocketMessageVO();
            broadcastMessage.setMessage(webSocketMessage.getMessage());
            broadcastMessage.setSessionKeys(unsentSessionKeys);
            redisTemplate.convertAndSend(WebSocketConstants.WEB_SOCKET_TOPIC, broadcastMessage);

            log.info(" WebSocket发送主题订阅消息topic={} sessionKeys={} message={}",
                    WebSocketConstants.WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
        }
    }

    /**
     * 发布订阅的消息(群发)
     *
     * @param message 消息内容
     */
    public static void publishAll(String message) {
        WebSocketMessageVO broadcastMessage = new WebSocketMessageVO();
        broadcastMessage.setMessage(message);
        redisTemplate.convertAndSend(WebSocketConstants.WEB_SOCKET_TOPIC, broadcastMessage);
        log.info("WebSocket发送主题订阅消息topic:{} message:{}", WebSocketConstants.WEB_SOCKET_TOPIC, message);
    }

    public static void sendPongMessage(WebSocketSession session) {
        sendMessage(session, new PongMessage());
    }

    public static void sendMessage(WebSocketSession session, String message) {
        sendMessage(session, new TextMessage(message));
    }

    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
        if (session == null || !session.isOpen()) {
            log.warn("[send] session会话已经关闭");
        } else {
            try {
                session.sendMessage(message);
            } catch (IOException e) {
                log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
            }
        }
    }
}
